﻿using EventBus.Common;
using RabbitMQ.Client;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace EventBus.RabbitMQ
{
    public class RabbitMqPublisher : IRabbitMqPublisher
    {
        private readonly IConnectionChannel _connectionChannel;
        private readonly IModel _channel;
        IBasicProperties properties;
        ConcurrentDictionary<ulong, string> confirmedDictionary = new ConcurrentDictionary<ulong, string>();
        public RabbitMqPublisher(IConnectionChannel connectionChannel)
        {
            _connectionChannel = connectionChannel;
            _channel = _connectionChannel.GetChannel() ?? throw new Exception("创建Channel失败");
            //声名交换机
            _channel.ExchangeDeclare(exchange: _connectionChannel.ExchangeName, type: "topic", durable: true);
            //启用发布者确认
            _channel.ConfirmSelect();
            //肯定确认
            _channel.BasicAcks += (s, e) =>
            {
                //多条
                if (e.Multiple)
                {
                    var confirmed = confirmedDictionary.Where(k => k.Key <= e.DeliveryTag);
                    foreach (var entry in confirmed)
                    {
                        confirmedDictionary.TryRemove(entry.Key, out _);
                    }
                }
                //单条
                else
                {
                    confirmedDictionary.TryRemove(e.DeliveryTag, out _);
                }
            };
            //否定确认
            _channel.BasicNacks += (s, e) =>
            {
                //多条
                if (e.Multiple)
                {
                    var confirmed = confirmedDictionary.Where(k => k.Key <= e.DeliveryTag);
                    foreach (var entry in confirmed)
                    {
                        confirmedBasicNackLog(entry.Key);
                    }
                }
                //单条
                else
                {
                    confirmedBasicNackLog(e.DeliveryTag);
                }
            };
        }

        /// <summary>
        /// 否定确认日志
        /// </summary>
        /// <param name="sequenceNumber"></param>
        public void confirmedBasicNackLog(ulong sequenceNumber)
        {
            confirmedDictionary.TryGetValue(sequenceNumber, out string body);
            Console.WriteLine(body);
            confirmedDictionary.TryRemove(sequenceNumber, out _);
        }

        /// <summary>
        /// 发布事件
        /// </summary>
        /// <returns></returns>
        public void Publish(string queueName, object content)
        {
            try
            {
                var message = SwifterJsonSerializer.SerializeObject(content);
                confirmedDictionary.TryAdd(_channel.NextPublishSeqNo, message);
                var body = Encoding.Default.GetBytes(message);
                //队列持久化
                //var dictionary = new Dictionary<string, object>();
                //dictionary.TryAdd(queueName, content);
                properties = _channel.CreateBasicProperties();
                properties.Persistent = true;
                //properties.Headers = dictionary;
                //发送消息
                _channel.BasicPublish(_connectionChannel.ExchangeName, queueName, properties, body);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                throw;
            }
        }

        /// <summary>
        /// 异步发布事件
        /// </summary>
        /// <returns></returns>
        public async Task PublishAsync(string queueName, object content)
        {
            try
            {
                var message = SwifterJsonSerializer.SerializeObject(content);
                confirmedDictionary.TryAdd(_channel.NextPublishSeqNo, message);
                var body = Encoding.Default.GetBytes(message);
                //队列持久化
                properties = _channel.CreateBasicProperties();
                properties.Persistent = true;
                //发送消息
                _channel.BasicPublish(_connectionChannel.ExchangeName, queueName, properties, body);
                await Task.CompletedTask;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                throw;
            }
        }

    }

}
